from hashlib import sha256

from django.db.models import Model
from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from guardian.shortcuts import assign_perm

from authentik.core.models import (
    USER_PATH_SYSTEM_PREFIX,
    AuthenticatedSession,
    Token,
    TokenIntents,
    User,
    UserTypes,
)
from authentik.core.signals import password_changed
from authentik.enterprise.providers.ssf.models import (
    EventTypes,
    SSFProvider,
)
from authentik.enterprise.providers.ssf.tasks import send_ssf_events
from authentik.events.middleware import audit_ignore
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_duo.models import DuoDevice
from authentik.stages.authenticator_static.models import StaticDevice
from authentik.stages.authenticator_totp.models import TOTPDevice
from authentik.stages.authenticator_webauthn.models import (
    UNKNOWN_DEVICE_TYPE_AAGUID,
    WebAuthnDevice,
)

USER_PATH_PROVIDERS_SSF = USER_PATH_SYSTEM_PREFIX + "/providers/ssf"


@receiver(post_save, sender=SSFProvider)
def ssf_providers_post_save(sender: type[Model], instance: SSFProvider, created: bool, **_):
    """Create service account before provider is saved"""
    identifier = instance.service_account_identifier
    user, _ = User.objects.update_or_create(
        username=identifier,
        defaults={
            "name": f"SSF Provider {instance.name} Service-Account",
            "type": UserTypes.INTERNAL_SERVICE_ACCOUNT,
            "path": USER_PATH_PROVIDERS_SSF,
        },
    )
    assign_perm("add_stream", user, instance)
    token, token_created = Token.objects.update_or_create(
        identifier=identifier,
        defaults={
            "user": user,
            "intent": TokenIntents.INTENT_API,
            "expiring": False,
            "managed": f"goauthentik.io/providers/ssf/{instance.pk}",
        },
    )
    if created or token_created:
        with audit_ignore():
            instance.token = token
            instance.save()


@receiver(pre_delete, sender=AuthenticatedSession)
def ssf_user_session_delete_session_revoked(sender, instance: AuthenticatedSession, **_):
    """Session revoked trigger (users' session has been deleted)

    As this signal is also triggered with a regular logout, we can't be sure
    if the session has been deleted by an admin or by the user themselves."""
    send_ssf_events(
        EventTypes.CAEP_SESSION_REVOKED,
        {
            "initiating_entity": "user",
        },
        sub_id={
            "format": "complex",
            "session": {
                "format": "opaque",
                "id": sha256(instance.session.session_key.encode("ascii")).hexdigest(),
            },
            "user": {
                "format": "email",
                "email": instance.user.email,
            },
        },
    )


@receiver(password_changed)
def ssf_password_changed_cred_change(sender, user: User, password: str | None, **_):
    """Credential change trigger (password changed)"""
    send_ssf_events(
        EventTypes.CAEP_CREDENTIAL_CHANGE,
        {
            "credential_type": "password",
            "change_type": "revoke" if password is None else "update",
        },
        sub_id={
            "format": "complex",
            "user": {
                "format": "email",
                "email": user.email,
            },
        },
    )


device_type_map = {
    StaticDevice: "pin",
    TOTPDevice: "pin",
    WebAuthnDevice: "fido-u2f",
    DuoDevice: "app",
}


@receiver(post_save)
def ssf_device_post_save(sender: type[Model], instance: Device, created: bool, **_):
    if not isinstance(instance, Device):
        return
    if not instance.confirmed:
        return
    device_type = device_type_map.get(instance.__class__)
    data = {
        "credential_type": device_type,
        "change_type": "create" if created else "update",
        "friendly_name": instance.name,
    }
    if isinstance(instance, WebAuthnDevice) and instance.aaguid != UNKNOWN_DEVICE_TYPE_AAGUID:
        data["fido2_aaguid"] = instance.aaguid
    send_ssf_events(
        EventTypes.CAEP_CREDENTIAL_CHANGE,
        data,
        sub_id={
            "format": "complex",
            "user": {
                "format": "email",
                "email": instance.user.email,
            },
        },
    )


@receiver(post_delete)
def ssf_device_post_delete(sender: type[Model], instance: Device, **_):
    if not isinstance(instance, Device):
        return
    if not instance.confirmed:
        return
    device_type = device_type_map.get(instance.__class__)
    data = {
        "credential_type": device_type,
        "change_type": "delete",
        "friendly_name": instance.name,
    }
    if isinstance(instance, WebAuthnDevice) and instance.aaguid != UNKNOWN_DEVICE_TYPE_AAGUID:
        data["fido2_aaguid"] = instance.aaguid
    send_ssf_events(
        EventTypes.CAEP_CREDENTIAL_CHANGE,
        data,
        sub_id={
            "format": "complex",
            "user": {
                "format": "email",
                "email": instance.user.email,
            },
        },
    )
